diff options
Diffstat (limited to 'qpid/java/broker/src/main')
32 files changed, 1202 insertions, 20 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index e0d325a5b0..fc16b75e1a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -94,6 +94,7 @@ public class ServerConfiguration implements SignalHandler envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer"); envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay"); envVarMap.put("QPID_ENABLEPOOLEDALLOCATOR", "advanced.enablePooledAllocator"); + envVarMap.put("QPID_STATUS-UPDATES", "status-updates"); } public ServerConfiguration(File configurationURL) throws ConfigurationException @@ -186,7 +187,12 @@ public class ServerConfiguration implements SignalHandler } return conf; } - + + public boolean getStatusEnabled() + { + return getConfig().getBoolean("status-updates", true); + } + // Our configuration class needs to make the interpolate method // public so it can be called below from the config method. private static class MyConfiguration extends CompositeConfiguration @@ -541,4 +547,13 @@ public class ServerConfiguration implements SignalHandler getConfig().getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD)); } + + public boolean getStatusUpdates() + { + // Retrieve the setting from configuration but default to on. + String value = getConfig().getString("status-updates", "on"); + + return value.equalsIgnoreCase("on"); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 054674aed4..5d7adc6371 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -54,8 +54,11 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB AMQProtocolSession session = stateManager.getProtocolSession(); VirtualHost virtualHost = session.getVirtualHost(); - final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore() - ); + final AMQChannel channel = new AMQChannel(session,channelId, + virtualHost.getMessageStore()); + + + session.addChannel(channel); ChannelOpenOkBody response; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java new file mode 100644 index 0000000000..e9cc7449cd --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/BrokerMessages.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +public class BrokerMessages +{ + + public static LogMessage BRK_1001(String version, String build) + { + return new LogMessage() + { + + }; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java new file mode 100644 index 0000000000..203a5d160d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * LogActor the entity that is stored as in a ThreadLocal and used to perform logging. + * + * The actor is responsible for formatting its display name for the log entry. + * + * The actor performs the requested logging. + */ +public interface LogActor +{ + /** + * Logs the specified LogMessage about the LogSubject + * + * Currently logging has a global setting however this will later be revised and + * as such the LogActor will need to take into consideration any new configuration + * as a means of enabling the logging of LogActors and LogSubjects. + * + * @param subject The subject that is being logged + * @param message The message to log + */ + public void message(LogSubject subject, LogMessage message); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java new file mode 100644 index 0000000000..5c112ff100 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogMessage.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +public interface LogMessage +{ + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java new file mode 100644 index 0000000000..e53ef364bf --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogSubject.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * Each LogSubject that wishes to be logged will implement this to provide their + * own display representation. + * + * The display representation is retrieved through the toString() method. + */ +public interface LogSubject +{ + /** + * Logs the message as provided by String.valueOf(message). + * + * @returns String the display representation of this LogSubject + */ + public String toString(); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java new file mode 100644 index 0000000000..7d515f3263 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RawMessageLogger.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * A RawMessage Logger takes the given String and any Throwable and writes the + * data to its resource. + */ +public interface RawMessageLogger +{ + + /** + * Log the given message. + * + * @param message String to log. + */ + public void rawMessage(String message); + + /** + * Log the message and formatted stack trace for any Throwable. + * + * @param message String to log. + * @param throwable Throwable for which to provide stack trace. + */ + public void rawMessage(String message, Throwable throwable); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java new file mode 100644 index 0000000000..cd7992faa7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +/** + * The RootMessageLogger is used by the LogActors to query if + * logging is enabled for the requested message and to provide the actual + * message that should be logged. + */ +public interface RootMessageLogger +{ + /** + * Determine if the LogSubject and the LogActor should be + * generating log messages. + * + * @param subject The subject of this log request + * @param actor The actor requesting the logging + * @return boolean true if the message should be logged. + */ + boolean isMessageEnabled(LogActor actor, LogSubject subject); + + + /** + * Log the raw message to the configured logger. + * + * @param message The message to log + */ + public void rawMessage(String message); + + /** + * Log the raw message to the configured logger. + * Along with a formated stack trace from the Throwable. + * + * @param message The message to log + * @param throwable Optional Throwable that should provide stact trace + */ + void rawMessage(String message, Throwable throwable); +}
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java new file mode 100644 index 0000000000..9270c316b6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging; + +import org.apache.qpid.server.configuration.ServerConfiguration; + +public class RootMessageLoggerImpl implements RootMessageLogger +{ + private boolean _enabled; + + RawMessageLogger _rawLogger; + private static final String MESSAGE = "MESSAGE "; + + public RootMessageLoggerImpl(ServerConfiguration configuration, RawMessageLogger rawLogger) + { + _enabled = configuration.getStatusUpdates(); + _rawLogger = rawLogger; + } + + public boolean isMessageEnabled(LogActor actor, LogSubject subject) + { + return _enabled; + } + + public void rawMessage(String message) + { + _rawLogger.rawMessage(MESSAGE + message); + } + + public void rawMessage(String message, Throwable throwable) + { + _rawLogger.rawMessage(MESSAGE + message, throwable); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java new file mode 100644 index 0000000000..3170040a77 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.text.MessageFormat; + +/** + * An AMQPChannelActor represtents a connection through the AMQP port with an + * associated Channel. + * + * <p/> + * This is responsible for correctly formatting the LogActor String in the log + * <p/> + * [con:1(user@127.0.0.1/)/ch:1] + * <p/> + * To do this it requires access to the IO Layers as well as a Channel + */ +public class AMQPChannelActor extends AbstractActor +{ + + /** + * Create a new ChannelActor + * + * @param channel The Channel for this LogActor + * @param rootLogger The root Logger that this LogActor should use + */ + public AMQPChannelActor(AMQChannel channel, RootMessageLogger rootLogger) + { + super(rootLogger); + + AMQProtocolSession session = channel.getProtocolSession(); + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ChannelLogSubject.CHANNEL_FORMAT : + * con:{0}({1}@{2}/{3})/ch:{4} + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()) + + "] "; + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java new file mode 100644 index 0000000000..432b1d8203 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +import java.text.MessageFormat; + +/** + * An AMQPConnectionActor represtents a connectionthrough the AMQP port. + * <p/> + * This is responsible for correctly formatting the LogActor String in the log + * <p/> + * [ con:1(user@127.0.0.1/) ] + * <p/> + * To do this it requires access to the IO Layers. + */ +public class AMQPConnectionActor extends AbstractActor +{ + /** + * 0 - Connection ID + * 1 - Remote Address + */ + public static String SOCKET_FORMAT = "con:{0}({1})"; + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String USER_FORMAT = "con:{0}({1}@{2})"; + + public AMQPConnectionActor(AMQProtocolSession session, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "[" + MessageFormat.format(SOCKET_FORMAT, + session.getSessionID(), + session.getRemoteAddress()) + + + "] "; + } + + /** + * Call when the connection has been authorized so that the logString + * can be updated with the new user identity. + * + * @param session the authorized session + */ + public void connectionAuthorized(AMQProtocolSession session) + { + _logString = "[" + MessageFormat.format(USER_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress()) + + "] "; + + } + + /** + * Called once the user has been authenticated and they are now selecting + * the virtual host they wish to use. + * + * @param session the session that now has a virtualhost associated with it. + */ + public void virtualHostSelected(AMQProtocolSession session) + { + + /** + * LOG FORMAT used by the AMQPConnectorActor follows + * ConnectionLogSubject.CONNECTION_FORMAT : + * con:{0}({1}@{2}/{3}) + * + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()) + + "] "; + + } +} + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java new file mode 100644 index 0000000000..95f2dc9ff6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.RootMessageLogger; + +public abstract class AbstractActor implements LogActor +{ + protected String _logString; + protected RootMessageLogger _rootLogger; + + public AbstractActor(RootMessageLogger rootLogger) + { + _rootLogger = rootLogger; + } + + public void message(LogSubject subject, LogMessage message) + { + if (_rootLogger.isMessageEnabled(this, subject)) + { + _rootLogger.rawMessage(_logString + String.valueOf(subject) + message); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java new file mode 100644 index 0000000000..221e57eebb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.LogActor; + +import java.util.LinkedList; +import java.util.Deque; + +public class CurrentActor +{ + private static final ThreadLocal<Deque<LogActor>> _currentActor = new ThreadLocal<Deque<LogActor>>() + { + protected Deque<LogActor> initialValue() + { + return new LinkedList<LogActor>(); + } + }; + + public static void set(LogActor actor) + { + Deque<LogActor> stack = _currentActor.get(); + stack.addFirst(actor); + } + + public static void remove() + { + Deque<LogActor> stack = _currentActor.get(); + stack.remove(); + } + + public static LogActor get() + { + return _currentActor.get().peek(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java new file mode 100644 index 0000000000..58d55a13bb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.actors; + +import org.apache.qpid.server.logging.RootMessageLogger; + +import java.text.MessageFormat; +import java.security.Principal; + +public class ManagementActor extends AbstractActor +{ + + /** + * LOG FORMAT for the ManagementActor, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + */ + public static final String MANAGEMENT_FORMAT = "mng:{0}({1}@{2})"; + + /** + * //todo Correct interface to provide connection details + * @param user + * @param rootLogger The RootLogger to use for this Actor + */ + public ManagementActor(Principal user, RootMessageLogger rootLogger) + { + super(rootLogger); + + _logString = "["+ MessageFormat.format(MANAGEMENT_FORMAT, + "<MNG:ConnectionID>", + user.getName(), + "<MNG:RemoteAddress>") + + "] "; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java new file mode 100644 index 0000000000..3774155626 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.logging.rawloggers; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.server.logging.RawMessageLogger; + +public class Log4jMessageLogger implements RawMessageLogger +{ + public static final String DEFAULT_LEVEL = "INFO"; + public static final String DEFAULT_LOGGER = "qpid.message"; + private Level _level; + private Logger _rawMessageLogger; + + public Log4jMessageLogger() + { + this(DEFAULT_LEVEL, DEFAULT_LOGGER); + } + + public Log4jMessageLogger(String level, String logger) + { + _level = Level.toLevel(level); + + _rawMessageLogger = Logger.getLogger(logger); + } + + public void rawMessage(String message) + { + rawMessage(message, null); + } + + public void rawMessage(String message, Throwable throwable) + { + _rawMessageLogger.log(_level, message, throwable); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java new file mode 100644 index 0000000000..4fb5bdcc93 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.logging.LogSubject; + +import java.text.MessageFormat; + +/** + * The LogSubjects all have a similar requriement to format their output and + * provide the String value. + * + * This Abstract LogSubject provides this basic functionality, allowing the + * actual LogSubjects to provide their formating and data. + */ +public abstract class AbstractLogSubject implements LogSubject +{ + /** + * The logString that will be returned via toString + */ + protected String logString; + + /** + * Set the toString logging of this LogSubject. Based on a format provided + * by format and the var args. + * @param format The Message to format + * @param args The values to put in to the message. + */ + protected void setLogStringWithFormat(String format, Object... args) + { + logString = "[" + MessageFormat.format(format, args) + "] "; + } + + /** + * ToString is how the Logging infrastructure will get the text for this + * LogSubject + * + * @return String representing this LogSubject + */ + @Override + public String toString() + { + return logString; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java new file mode 100644 index 0000000000..fd171fea5a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; + +public class BindingLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + * 3 - Queue Name + * 4 - Binding RoutingKey + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})/qu({3})/rk({4})"; + + /** + * Create a BindingLogSubject that Logs in the following format. + * + * [ vh(/)/ex(amq.direct)/qu(testQueue)/bd(testQueue) ] + * + * @param routingKey + * @param exchange + * @param queue + */ + public BindingLogSubject(AMQShortString routingKey, Exchange exchange, + AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(), + exchange.getType(), + exchange.getName(), + queue.getName(), + routingKey); + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java new file mode 100644 index 0000000000..1b22de6d01 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; + +public class ChannelLogSubject extends AbstractLogSubject +{ + /** + * LOG FORMAT for the ChannelLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + * 4 - Channel ID + */ + public static String CHANNEL_FORMAT = ConnectionLogSubject.CONNECTION_FORMAT + + "/ch:{4}"; + + public ChannelLogSubject(AMQChannel channel) + { + AMQProtocolSession session = channel.getProtocolSession(); + + // Provide the value for the 4th replacement. + setLogStringWithFormat(CHANNEL_FORMAT, + session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName(), + channel.getChannelId()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java new file mode 100644 index 0000000000..e07dbcda23 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.protocol.AMQProtocolSession; + +/** The Connection LogSubject */ +public class ConnectionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ConnectionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Connection ID + * 1 - User ID + * 2 - IP + * 3 - Virtualhost + */ + public static final String CONNECTION_FORMAT = "con:{0}({1}@{2}/{3})"; + + public ConnectionLogSubject(AMQProtocolSession session) + { + setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(), + session.getAuthorizedID().getName(), + session.getRemoteAddress(), + session.getVirtualHost().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java new file mode 100644 index 0000000000..21e5f5e4ce --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ExchangeLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost Name + * 1 - Exchange Type + * 2 - Exchange Name + */ + protected static String BINDING_FORMAT = "vh(/{0})/ex({1}/{2})"; + + /** Create an ExchangeLogSubject that Logs in the following format. */ + public ExchangeLogSubject(Exchange exchange, VirtualHost vhost) + { + setLogStringWithFormat(BINDING_FORMAT, vhost.getName(), + exchange.getType(), exchange.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java new file mode 100644 index 0000000000..89f31ef477 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/QueueLogSubject.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.queue.AMQQueue; + +public class QueueLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the ExchangeLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Virtualhost name + * 1 - queue name + */ + protected static String BINDING_FORMAT = "vh(/{0})/qu({1})"; + + /** Create an QueueLogSubject that Logs in the following format. */ + public QueueLogSubject(AMQQueue queue) + { + setLogStringWithFormat(BINDING_FORMAT, + queue.getVirtualHost().getName(), + queue.getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java new file mode 100644 index 0000000000..b68ef2e9a9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.subjects; + +import org.apache.qpid.server.subscription.Subscription; + +public class SubscriptionLogSubject extends AbstractLogSubject +{ + + /** + * LOG FORMAT for the SubscriptionLogSubject, + * Uses a MessageFormat call to insert the requried values according to + * these indicies: + * + * 0 - Subscription ID + * 1 - queue name + */ + protected static String BINDING_FORMAT = "sub:{0}(qu({1}))"; + + /** + * Create an QueueLogSubject that Logs in the following format. + * + * @param subscription + */ + public SubscriptionLogSubject(Subscription subscription) + { + + setLogStringWithFormat(BINDING_FORMAT, subscription.getSubscriptionID(), + subscription.getQueue().getName()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 205ca73f13..e46a52f3bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -21,26 +21,39 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; - +import org.apache.mina.common.CloseFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; -import org.apache.mina.common.CloseFuture; import org.apache.mina.transport.vmpipe.VmPipeAddress; - import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.handler.ServerMethodDispatcherImpl; +import org.apache.qpid.server.logging.actors.AMQPConnectionActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -54,7 +67,6 @@ import org.apache.qpid.transport.Sender; import javax.management.JMException; import javax.security.sasl.SaslServer; - import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.Principal; @@ -64,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { @@ -71,6 +84,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); + private static final AtomicLong idGenerator = new AtomicLong(0); + // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; @@ -120,6 +135,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L; private org.apache.mina.common.WriteFuture _lastWriteFuture; + // Create a simple ID that increments for ever new Session + private final long _sessionID = idGenerator.getAndIncrement(); + + private AMQPConnectionActor _actor; + public ManagedObject getManagedObject() { return _managedObject; @@ -134,6 +154,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); + try { IoServiceConfig config = session.getServiceConfig(); @@ -158,6 +180,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _codecFactory = codecFactory; + _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -183,6 +206,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return (AMQProtocolSession) minaProtocolSession.getAttachment(); } + public long getSessionID() + { + return _sessionID; + } + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; @@ -235,6 +263,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } + CurrentActor.set(_actor); try { body.handle(channelId, this); @@ -244,7 +273,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable closeChannel(channelId); throw e; } - + finally + { + CurrentActor.remove(); + } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -796,6 +828,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _virtualHost = virtualHost; + _actor.virtualHostSelected(this); + _virtualHost.getConnectionRegistry().registerConnection(this); _managedObject = createMBean(); @@ -820,6 +854,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void setAuthorizedID(Principal authorizedID) { _authorizedID = authorizedID; + + // Let the actor know that this connection is now Authorized + _actor.connectionAuthorized(this); } public Principal getAuthorizedID() @@ -827,6 +864,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return _authorizedID; } + public SocketAddress getRemoteAddress() + { + return _minaProtocolSession.getRemoteAddress(); + } + public MethodRegistry getMethodRegistry() { return MethodRegistry.getMethodRegistry(getProtocolVersion()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1bac601225..f721730d9c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -36,6 +36,7 @@ import java.security.Principal; public interface AMQProtocolSession extends AMQVersionAwareProtocolSession { + long getSessionID(); public static final class ProtocolSessionIdentifier { @@ -198,6 +199,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession /** @return a Principal that was used to authorized this session */ Principal getAuthorizedID(); + public java.net.SocketAddress getRemoteAddress(); + public MethodRegistry getMethodRegistry(); public MethodDispatcher getMethodDispatcher(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index e994967dc5..8c66508307 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -202,6 +202,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException { + exchange.registerQueue(routingKey, this, arguments); if (isDurable() && exchange.isDurable()) { @@ -209,6 +210,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } _bindings.addBinding(routingKey, arguments, exchange); +// ExchangeBinding binding = new ExchangeBinding(routingKey, exchange, arguments); + + //fixme MR logging in progress +// _bindings.addBinding(binding); +// +// if (_logger.isMessageEnabled(binding)) +// { +// _logger.message(binding, "QM-1001 : Created Binding"); +// } } public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22b4623ae1..b58b849133 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; /** * An abstract application registry that provides access to configuration information and handles the @@ -70,6 +71,8 @@ public abstract class ApplicationRegistry implements IApplicationRegistry protected PluginManager _pluginManager; + protected RootMessageLogger _rootMessageLogger; + static { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService())); @@ -287,4 +290,9 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return _pluginManager; } + public RootMessageLogger getRootMessageLogger() + { + return _rootMessageLogger; + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 39164883f9..31a85b878a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalD import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { @@ -44,9 +46,12 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, + new Log4jMessageLogger()); + initialiseManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); _pluginManager = new PluginManager(_configuration.getPluginDirectory()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index bbfda3addc..7d17639f22 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.mina.common.IoAcceptor; public interface IApplicationRegistry @@ -69,6 +70,8 @@ public interface IApplicationRegistry PluginManager getPluginManager(); + RootMessageLogger getRootMessageLogger(); + /** * Register any acceptors for this registry * @param bindAddress The address that the acceptor has been bound with diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 9419572399..19eabce9ff 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -52,6 +52,8 @@ public interface Subscription AMQShortString getConsumerTag(); + long getSubscriptionID(); + boolean isSuspended(); boolean hasInterest(QueueEntry msg); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 7aa9d1e3af..51da884d1e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -66,6 +67,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final Lock _stateChangeLock; + private static final AtomicLong idGenerator = new AtomicLong(0); + // Create a simple ID that increments for ever new Subscription + private final long _subscriptionID = idGenerator.getAndIncrement(); + + static final class BrowserSubscription extends SubscriptionImpl { public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession, @@ -526,6 +532,11 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return _consumerTag; } + public long getSubscriptionID() + { + return _subscriptionID; + } + public AMQProtocolSession getProtocolSession() { return _channel.getProtocolSession(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index eda2d3a94e..9ef1e029d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.util; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Properties; - -import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.logging.RootMessageLoggerImpl; +import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger; import org.apache.qpid.server.management.NoopManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -41,6 +36,10 @@ import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticat import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + public class NullApplicationRegistry extends ApplicationRegistry { public NullApplicationRegistry() throws ConfigurationException @@ -51,9 +50,11 @@ public class NullApplicationRegistry extends ApplicationRegistry public void initialise() throws Exception { _logger.info("Initialising NullApplicationRegistry"); - + + _rootMessageLogger = new RootMessageLoggerImpl(_configuration, new Log4jMessageLogger()); + _configuration.setHousekeepingExpiredMessageCheckPeriod(200); - + Properties users = new Properties(); users.put("guest", "guest"); @@ -65,7 +66,7 @@ public class NullApplicationRegistry extends ApplicationRegistry _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); _managedObjectRegistry = new NoopManagedObjectRegistry(); - _virtualHostRegistry = new VirtualHostRegistry(); + _virtualHostRegistry = new VirtualHostRegistry(this); PropertiesConfiguration vhostProps = new PropertiesConfiguration(); VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps); VirtualHost dummyHost = new VirtualHost(hostConfig); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java index 27917fac8a..5543adbeb5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java @@ -20,6 +20,9 @@ */
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -32,6 +35,12 @@ public class VirtualHostRegistry private String _defaultVirtualHostName;
+ private ApplicationRegistry _applicationRegistry;
+
+ public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
+ {
+ _applicationRegistry = applicationRegistry;
+ }
public synchronized void registerVirtualHost(VirtualHost host) throws Exception
{
@@ -67,4 +76,9 @@ public class VirtualHostRegistry {
return new ArrayList<VirtualHost>(_registry.values());
}
+
+ public ApplicationRegistry getApplicationRegistry()
+ {
+ return _applicationRegistry;
+ }
}
|
